Native Driver Introduction

Welcome to the Python native driver tutorial. This tutorial will be constantly evolving as feedback is incorperated. Pull requests are encouraged! This is fully open source, feel free to fork and deliver it at other meetups!

Check out the documenation and the github repo.

We're going to assume a keyspace named tutorial exists

In a CQL shell, we can do this:

CREATE KEYSPACE if not exists tutorial WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

we'll create the rest of the tables programatically


In [ ]:
%load_ext cql

In [ ]:
%cql DROP KEYSPACE IF EXISTS tutorial;
%cql CREATE KEYSPACE IF NOT EXISTS tutorial WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

In [3]:
# here we set up our database connection
from cassandra.cluster import Cluster
cluster = Cluster(["127.0.0.1"])

# a session manages the connection pool for us
session = cluster.connect("tutorial")
print "Connected"


Connected

Table Setup


In [ ]:
# basic table setup
tables = ["photo", "comment"]

# drop the existing ones.  can't use a placeholder for 
# DROP TABLE so we just use python string interpolation
for table in tables:
    session.execute("DROP TABLE if exists %s" % table)

In [8]:
photo = """
CREATE TABLE photo (
  photo_id uuid,
  name text,
  PRIMARY KEY (photo_id)
)
"""

comment = """
CREATE TABLE comment (
  photo_id uuid,
  comment_id timeuuid,
  comment text,
  PRIMARY KEY (photo_id, comment_id)
) WITH CLUSTERING ORDER BY (comment_id DESC)
"""

session.execute(photo)
session.execute(comment)

Prepared Statements

We're going to populate photos using prepared statements.

Prepared statements are more secure and decrease server load.


In [9]:
import uuid
insert = session.prepare("INSERT INTO photo (photo_id, name) VALUES (?, ?)")

for x in range(100):
    session.execute(insert, (uuid.uuid4(), "test %d" % x))

for photo in session.execute("SELECT * from photo limit 5"):
    print photo


Row(photo_id=UUID('c21ae64a-6ba1-44ea-a223-0536dcbaea60'), name=u'test 66')
Row(photo_id=UUID('9684dc21-a8f1-419e-b42d-2b4b10bb0774'), name=u'test 60')
Row(photo_id=UUID('5274a8ec-cea6-4fdc-a206-88cce4e07afd'), name=u'test 25')
Row(photo_id=UUID('ae0359a5-6e89-47ec-97c7-e25d84c6d041'), name=u'test 1')
Row(photo_id=UUID('64314d48-85cd-46f0-ab39-005aafdd54ee'), name=u'test 71')

In [ ]:
# lets get a nice visual of some sensor data!

session.execute("DROP TABLE IF EXISTS sensor_data")
sensor_table = """
CREATE TABLE sensor_data (
  sensor_id uuid,
  created_at timeuuid,
  reading int,
  PRIMARY KEY (sensor_id, created_at)
) WITH CLUSTERING ORDER BY (created_at DESC)
"""

session.execute(sensor_table)

from uuid import uuid1, uuid4
from random import randint

insert = session.prepare("INSERT INTO sensor_data (sensor_id, created_at, reading) VALUES (?, ?, ?)")

sid = uuid4()
for x in range(100):
    session.execute(insert, (sid, uuid.uuid1(), randint(1, 1000)))

Performance

The Python driver works best when taking advantage of it's asynchronous features.

First, we'll insert our sensor data. Here's we'll use a callback to insert some data after we've created the initial sensor entry.


In [10]:
from datetime import datetime
from random import randint

session.execute("TRUNCATE sensor_data")
session.execute("DROP TABLE IF EXISTS sensor")
sensor_table = """
CREATE TABLE sensor (
  sensor_id uuid,
  name text,
  created_at timestamp,
  PRIMARY KEY (sensor_id)
)
"""

session.execute(sensor_table)

"""
from earlier defined table:

    sensor_data:
      sensor_id uuid,
      created_at timeuuid,
      reading int,
"""


Out[10]:
'\nfrom earlier defined table:\n\n    sensor_data:\n      sensor_id uuid,\n      created_at timeuuid,\n      reading int,\n'

In [13]:
insert_sensor = session.prepare("""INSERT INTO sensor (sensor_id, name, created_at) 
                                    VALUES (?, ?, ?)""")

def create_sensor_entries_callback(response, sensor_id):
    print "CALLBACK"

for x in range(10):
    sensor_data = (uuid.uuid4(), "sensor %d" % x, datetime.now())
    future = session.execute_async(insert_sensor, sensor_data)
    future.add_callback(create_sensor_entries_callback, sensor_id)


CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK
CALLBACK

In [12]:
insert_sensor = session.prepare("""INSERT INTO sensor (sensor_id, name, created_at) 
                                    VALUES (?, ?, ?)""")
insert_sensor_data = session.prepare("""INSERT INTO sensor_data (sensor_id, created_at, reading) 
                                         VALUES (?, ?, ?)""")

# CALLBACK: for each sensor we're going to create 100 sensor entires
def create_sensor_entries_callback(response, sensor_id):
    print "CALLBACK"
    for x in range(10):
        session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))

futures = []
sensor_ids = []

for x in range(10):
    sensor_id = uuid.uuid4()
    print sensor_id
    future = session.execute_async(insert_sensor, (sensor_id, "sensor %d" % x, datetime.now()))
    future.add_callback(create_sensor_entries_callback, sensor_id)
    futures.append(future)
    sensor_ids.append([sensor_id]) # we'll save this for later as a list of tuples


94610293-2e3c-4f41-a81a-587a805b901b
e692f8bd-a0d6-43e6-8521-f38911640971
d8bd41ff-1d3a-4492-9d47-6775f7d7b277
2ec54f8a-6a96-40c0-97cb-01cc5d358913
1bdeac75-7e12-43ba-80b5-2d38405f9843
CALLBACK
100e26b6-e61e-4936-a762-2db40f1fc35e
CALLBACK
342cecbf-1e2d-4d76-a8ba-33e308ae8fdb
CALLBACK
657f5b3d-b153-41b6-b107-9a9d7889c7df
d8e855da-2638-4154-a24c-4dbe3f55aab6
ec4aeb13-64a4-42c7-966f-bbaaa8c2cb55
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined
ERROR:cassandra.cluster:Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2720, in _set_result
    self._set_final_result(results)
  File "build/bdist.macosx-10.9-x86_64/egg/cassandra/cluster.py", line 2889, in _set_final_result
    fn(response, *args, **kwargs)
  File "<ipython-input-12-7e77607bd240>", line 10, in create_sensor_entries_callback
    session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
NameError: global name 'uuid1' is not defined

In [ ]:
print "This is to clear the rest of the callbacks (ipython notebook issue)"

Concurrency with cassandra.concurrent

We have other options to take advantage of the built in concurrency. Let's try selecting the last 3 readings from each sensor back out. We can prepare a statement, and execute that prepared statement with a list of arguments. For example:


In [ ]:
from cassandra.concurrent import execute_concurrent_with_args

select_statement = session.prepare("""SELECT * FROM sensor_data WHERE sensor_id=? 
                                        ORDER BY created_at DESC LIMIT 1""")

print "Sensor IDS:", sensor_ids

result = execute_concurrent_with_args(session, select_statement, sensor_ids)
for x in result:
    print "result:", x

Understanding Performance Through Tracing

The native driver exposes a convenient means of getting metrics from Cassandra's internals.


In [ ]:
from cassandra.query import SimpleStatement
statement = SimpleStatement("SELECT * from sensor_data WHERE sensor_id=%s LIMIT 1")
result = session.execute(statement, sensor_ids[0], trace=True)

for event in statement.trace.events:
    print event.source_elapsed, event.description

Load Balancing Policies

The native driver will manage the connection pool for you, but by default it uses a RoundRobinPolicy to pick which server it talks to. In your cluster, this means it'll talk to every machine in every datacenter. This is ok if you only have 1 cluster but most of the time you're going to want to only talk to machines in the datacenter you're in. For futher reading check out the load balancing policies section of the native driver docs.


In [ ]:
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy

# load local_dc string from environment or config in a real codebase
policy = TokenAwarePolicy(DCAwareRoundRobinPolicy(local_dc='US_EAST')) 

token_aware_cluster = Cluster(['127.0.0.1'], load_balancing_policy=policy)

Lightweight Transactions

Lightweight transactions allow us to avoid race conditions by using Paxos. Let's set up a user table to use in our examples.


In [ ]:
user_table = """
CREATE TABLE user (
    user_id int primary key,
    name text
    )
"""
session.execute("DROP TABLE IF EXISTS user")
session.execute(user_table)
print "User table created: " , user_table

LWT Inserts


In [ ]:
insert = "INSERT INTO user (user_id, name) values (1, 'jon') IF NOT EXISTS"
# what if this ran a millisecond after we just put steve in the system?
insert2 = "INSERT INTO user (user_id, name) values (1, 'steve') IF NOT EXISTS"

print "first query: " , session.execute(insert)

print "second query: " , session.execute(insert2)

LWT Updates


In [ ]:
update_lwt = "UPDATE user SET name = 'steve' where user_id = 1 IF name = 'jon'"
print "update result: ", session.execute(update_lwt)

update_lwt = "UPDATE user SET name = 'joe' where user_id = 1 IF name = 'jon'"
print "update result: ", session.execute(update_lwt)

User Defined Types

User defined types allow you to create complex data structures in Cassandra.


In [ ]:
session.execute("DROP TABLE IF EXISTS users")
session.execute("DROP TYPE IF EXISTS address")

session.execute("CREATE TYPE tutorial.address (street text, zipcode int)")
session.execute("CREATE TABLE users (id int PRIMARY KEY, location frozen<address>)")

cluster = Cluster() # dirty fix for issue with ipython & async
session = cluster.connect("tutorial")

# create a class to map to the "address" UDT
class Address(object):
    def __init__(self, street, zipcode):
        self.street = street
        self.zipcode = zipcode

print cluster.metadata.keyspaces['tutorial']
cluster.register_user_type('tutorial', 'address', Address)

# insert a row using an instance of Address
session.execute("INSERT INTO users (id, location) VALUES (%s, %s)",
                (0, Address("123 Main St.", 78723)))

# results will include Address instances
results = session.execute("SELECT * FROM users")
row = results[0]
print row.id, row.location.street, row.location.zipcode

In [ ]:
session.execute("CREATE TABLE geo_location (user_id uuid, ts timeuuid, geo frozen<tuple<float,float>>, primary key (user_id, ts))")

Named Tuples


In [ ]:
user_id = uuid4()
ts = uuid1()

session.execute("TRUNCATE geo_location")

session.execute("""INSERT INTO geo_location (user_id, ts, geo) 
                    VALUES (%s, %s, (%s, %s))""", 
                  (user_id, ts, 1.0, 2.0))
print session.execute("SELECT * from geo_location")[0]

In [ ]: